package com.burning.demo.flink.TableAPIAndSQL;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author cai
 * @date 2021/9/1 14:22
 */
public class FlinkSqlTest {

    public static void main(String[] args) throws Exception {
        testSum(args);
    }

    public static void testSum(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        String table1Sql = "CREATE TABLE table1(\n" +
                "    ebidsun_data_id varchar,\n" +
                "    ebidsun_platform_id VARCHAR,\n" +
                "    assessmentPrice_1 double,\n" +
                "    priceUnit_1 VARCHAR,\n" +
                "    writeTime_1 timestamp,\n" +
                "    PLATFORM_NAME_1 VARCHAR\n" +
                ") WITH(\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:mysql://47.117.34.236:3306/bidsun_ecs?characterEncoding=UTF8',\n" +
                "    'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "    'username' = 'services',\n" +
                "    'password' = '#MYsBvVB',\n" +
                "    'table-name' = 'catalog_main_data_fa3f9857ffb84e772c1889bfb41ab973'\n" +
                ")";
        tEnv.executeSql(table1Sql);
        String table2Sql = "CREATE TABLE ads_gzh_trade_count_daily(\n" +
                "    id varchar primary key,\n" +
                "    platform_code varchar,\n" +
                "    trade_count int\n" +
                ") WITH(\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:mysql://47.117.34.236:3306/bidsun_ecs?characterEncoding=UTF8',\n" +
                "    'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "    'username' = 'services',\n" +
                "    'password' = '#MYsBvVB',\n" +
                "    'table-name' = 'ads_gzh_trade_count_daily_test'\n" +
                ")";
        tEnv.executeSql(table2Sql);

        String sql = "insert into\n" +
                "    ads_gzh_trade_count_daily\n" +
                "select\n" +
                "    id,\n" +
                "    platform_code,\n" +
                "    trade_count\n" +
                "from\n" +
                "    (\n" +
                "        SELECT\n" +
                "            id,\n" +
                "            FIRST_VALUE(platform_code) as platform_code,\n" +
                "            FIRST_VALUE(date_str) as date_str,\n" +
                "            sum(line_count) as trade_count\n" +
                "        FROM\n" +
                "            (\n" +
                "                select\n" +
                "                    CONCAT(\n" +
                "                        ebidsun_platform_id,\n" +
                "                        DATE_FORMAT(writeTime_1, 'yyyyMMdd'),\n" +
                "                        '1'\n" +
                "                    ) as id,\n" +
                "                    ebidsun_platform_id as platform_code,\n" +
                "                    'XXXX' as platform_name,\n" +
                "                    'AAAAA' as business_type,\n" +
                "                    1 as line_count,\n" +
                "                    DATE_FORMAT(writeTime_1, 'yyyyMMdd') as date_str\n" +
                "                from\n" +
                "                    table1\n" +
                "            )\n" +
                "        group by\n" +
                "            id\n" +
                "    )\n" +
                "WHERE\n" +
                "    date_str = '20210824'";
        tEnv.executeSql(sql);

    }

    public static void testByStream(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        String table1Sql = "CREATE TABLE table1(\n" +
                "    ebidsun_data_id varchar,\n" +
                "    ebidsun_platform_id VARCHAR,\n" +
                "    assessmentPrice_1 double,\n" +
                "    priceUnit_1 VARCHAR,\n" +
                "    writeTime_1 timestamp,\n" +
                "    PLATFORM_NAME_1 VARCHAR\n" +
                ") WITH(\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:mysql://47.117.34.236:3306/bidsun_ecs?characterEncoding=UTF8',\n" +
                "    'driver' = 'com.mysql.cj.jdbc.Driver', \n" +
                "    'username' = 'services',\n" +
                "    'password' = '#MYsBvVB',\n" +
                "    'table-name' = 'catalog_main_data_fa3f9857ffb84e772c1889bfb41ab973'\n" +
                ")";
        TableResult tableResult1 = tEnv.executeSql(table1Sql);
        //ResolvedSchema resolvedSchema = tableResult1.getResolvedSchema();
        //tableResult1.print();

        String table2Sql = "CREATE TABLE ads_gzh_trade_count_daily(\n" +
                "    id varchar primary key,\n" +
                "    platform_code varchar,\n" +
                //"    platform_name varchar,\n" +
                //"    business_type varchar,\n" +
                "    trade_count int\n" +
                ") WITH(\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:mysql://47.117.34.236:3306/bidsun_ecs?characterEncoding=UTF8',\n" +
                "    'driver' = 'com.mysql.cj.jdbc.Driver', \n" +
                "    'username' = 'services',\n" +
                "    'password' = '#MYsBvVB',\n" +
                "    'table-name' = 'ads_gzh_trade_count_daily_test'\n" +
                ")";

        TableResult tableResult2 = tEnv.executeSql(table2Sql);
        //tableResult2.print();

        //String testCountSql = "select count(ebidsun_data_id) as trade_count from table1";
        String sql = "insert into\n" +
                "    ads_gzh_trade_count_daily\n" +
                "select\n" +
                "    id,\n" +
                "    platform_code,\n" +
                //"    platform_name,\n" +
                //"    business_type,\n" +
                "    trade_count\n" +
                "from\n" +
                "    (" +
                "        SELECT\n" +
                "            id,\n" +
                "            FIRST_VALUE(platform_code) as platform_code,\n" +
                //"            platform_name,\n" +
                //"            business_type,\n" +
                "            FIRST_VALUE(date_str) as date_str,\n" +
                "            sum(line_count) as trade_count\n" +
                "        FROM\n" +
                "            (\n" +
                "                select\n" +
                "                    CONCAT(\n" +
                "                        ebidsun_platform_id,\n" +
                "                        DATE_FORMAT(CAST(writeTime_1 as TIMESTAMP), \'yyyyMMdd\'),\n" +
                "                        '1'\n" +
                "                    ) as id,\n" +
                "                    ebidsun_platform_id as platform_code,\n" +
                "                    'XXXX' as platform_name,\n" +
                "                    'AAAAA' as business_type,\n" +
                "                    1 as line_count,\n" +
                "                    DATE_FORMAT(CAST(writeTime_1 as TIMESTAMP), \'yyyyMMdd\') as date_str\n" +
                "                from\n" +
                "                    table1\n" +
                "            )\n" +
                "        group by\n" +
                "            id" +
                "    )\n" +
                "WHERE\n" +
                "    date_str = '20210824'";
        tEnv.executeSql(sql);
        //Table table = tEnv.sqlQuery(sql);
        //tableResult.print();
        //tEnv.toRetractStream(table, Row.class).print();
        //env.execute();

        /*String insertSql = "insert into\n" +
                "    ads_gzh_trade_count_daily\n" +
                "select\n" +
                "    'A' as id,\n" +
                "    'B' as platform_code,\n" +
                "    'A' as platform_name,\n" +
                "    'A' as business_type,\n" +
                "    count(ebidsun_data_id) as trade_count\n" +
                "from\n" +
                "    table1\n" +
                "where\n" +
                "    ebidsun_data_id = '84430785644960153615ad4551b7d311eb899600163e217996'";

        tEnv.executeSql(insertSql);
*/
    }

    public static void testByBatch(String[] args) {
        ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();

        BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
        String table1Sql = "CREATE TABLE table1(\n" +
                "    ebidsun_data_id varchar,\n" +
                "    ebidsun_platform_id VARCHAR,\n" +
                "    assessmentPrice_1 double,\n" +
                "    priceUnit_1 VARCHAR,\n" +
                "    writeTime_1 timestamp,\n" +
                "    PLATFORM_NAME_1 VARCHAR\n" +
                ") WITH(\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:mysql://47.117.34.236:3306/bidsun_ecs?characterEncoding=UTF8',\n" +
                "    'username' = 'services',\n" +
                "    'password' = '#MYsBvVB',\n" +
                "    'table-name' = 'catalog_main_data_fa3f9857ffb84e772c1889bfb41ab973'\n" +
                ")";
        fbTableEnv.sqlUpdate(table1Sql);

        String table2Sql = "CREATE TABLE ads_gzh_trade_count_daily(\n" +
                "    id varchar,\n" +
                "    platform_code varchar,\n" +
                "    platform_name varchar,\n" +
                "    business_type varchar,\n" +
                "    trade_count int\n" +
                ") WITH(\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:mysql://47.117.34.236:3306/bidsun_ecs?characterEncoding=UTF8',\n" +
                "    'username' = 'services',\n" +
                "    'password' = '#MYsBvVB',\n" +
                "    'table-name' = 'ads_gzh_trade_count_daily_test'\n" +
                ")";

        fbTableEnv.sqlUpdate(table2Sql);

        String insertSql = "insert into\n" +
                "    ads_gzh_trade_count_daily\n" +
                "select\n" +
                "    'A' as id,\n" +
                "    'B' as platform_code,\n" +
                "    'A' as platform_name,\n" +
                "    'A' as business_type,\n" +
                "    count(ebidsun_data_id) as trade_count\n" +
                "from\n" +
                "    table1\n" +
                "where\n" +
                "    ebidsun_data_id = '84430785644960153615ad4551b7d311eb899600163e217996'";

        fbTableEnv.sqlUpdate(insertSql);

    }
}
